1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: JmsSession.java,v 1.6 2007/01/24 12:00:28 tanderson Exp $
44 */
45 package org.exolab.jms.client;
46
47 import java.io.Serializable;
48 import java.util.ArrayList;
49 import java.util.HashMap;
50 import java.util.List;
51 import java.util.Vector;
52 import javax.jms.BytesMessage;
53 import javax.jms.Connection;
54 import javax.jms.Destination;
55 import javax.jms.IllegalStateException;
56 import javax.jms.InvalidDestinationException;
57 import javax.jms.InvalidSelectorException;
58 import javax.jms.JMSException;
59 import javax.jms.MapMessage;
60 import javax.jms.Message;
61 import javax.jms.MessageConsumer;
62 import javax.jms.MessageListener;
63 import javax.jms.MessageProducer;
64 import javax.jms.ObjectMessage;
65 import javax.jms.Queue;
66 import javax.jms.QueueBrowser;
67 import javax.jms.Session;
68 import javax.jms.StreamMessage;
69 import javax.jms.TemporaryQueue;
70 import javax.jms.TemporaryTopic;
71 import javax.jms.TextMessage;
72 import javax.jms.Topic;
73 import javax.jms.TopicSubscriber;
74
75 import org.apache.commons.logging.Log;
76 import org.apache.commons.logging.LogFactory;
77
78 import org.exolab.jms.message.BytesMessageImpl;
79 import org.exolab.jms.message.MapMessageImpl;
80 import org.exolab.jms.message.MessageConverter;
81 import org.exolab.jms.message.MessageConverterFactory;
82 import org.exolab.jms.message.MessageImpl;
83 import org.exolab.jms.message.MessageSessionIfc;
84 import org.exolab.jms.message.ObjectMessageImpl;
85 import org.exolab.jms.message.StreamMessageImpl;
86 import org.exolab.jms.message.TextMessageImpl;
87 import org.exolab.jms.server.ServerSession;
88
89
90 /***
91 * Client implementation of the <code>javax.jms.Session</code> interface.
92 *
93 * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
94 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
95 * @version $Revision: 1.6 $ $Date: 2007/01/24 12:00:28 $
96 */
97 class JmsSession implements Session, JmsMessageListener, MessageSessionIfc {
98
99 /***
100 * The owner of the session.
101 */
102 private JmsConnection _connection;
103
104 /***
105 * The proxy to the remote session implementation.
106 */
107 private ServerSession _session = null;
108
109 /***
110 * If true, indicates that the session has been closed.
111 */
112 private volatile boolean _closed = false;
113
114 /***
115 * Determines if this session is being closed.
116 */
117 private boolean _closing = false;
118
119 /***
120 * Synchronization helper, used during close().
121 */
122 private final Object _closeLock = new Object();
123
124 /***
125 * This flag determines whether message delivery is enabled or disabled.
126 * Message delivery if disabled if the enclosing connection is stopped.
127 */
128 private boolean _stopped = true;
129
130 /***
131 * Indicates whether the consumer or the client will acknowledge any
132 * messages it receives. Ignored if the session is transacted. Legal values
133 * are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>
134 * and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
135 */
136 private final int _ackMode;
137
138 /***
139 * Maintains the a map of JmsMessageConsumer.getConsumerId() ->
140 * JmsMessageConsumer objects.
141 */
142 private HashMap _consumers = new HashMap();
143
144 /***
145 * Maintains a list of producers for the session.
146 */
147 private List _producers = new ArrayList();
148
149 /***
150 * Maintain a collection of acked messages for a transacted session. These
151 * messages are only sent to the server on commit.
152 */
153 private List _messagesToSend = new ArrayList();
154
155 /***
156 * This is the session's session listener which is used to receive all
157 * messages associated with all consumers registered with this session.
158 */
159 private MessageListener _listener = null;
160
161 /***
162 * The message cache holds all messages for the session, allocated by a
163 * JmsConnectionConsumer.
164 */
165 private Vector _messageCache = new Vector();
166
167 /***
168 * Monitor used to block consumers, if the session has been stopped, or no
169 * messages are available.
170 */
171 private final Object _receiveLock = new Object();
172
173 /***
174 * The identitifier of the consumer performing a blocking receive, or
175 * <code>-1</code> if no consumer is currently performing a blocking
176 * receive.
177 */
178 private long _blockingConsumer = -1;
179
180 /***
181 * The logger.
182 */
183 private static final Log _log = LogFactory.getLog(JmsSession.class);
184
185
186 /***
187 * Construct a new <code>JmsSession</code>
188 *
189 * @param connection the owner of the session
190 * @param transacted if <code>true</code>, the session is transacted.
191 * @param ackMode indicates whether the consumer or the client will
192 * acknowledge any messages it receives. This parameter
193 * will be ignored if the session is transacted. Legal
194 * values are <code>Session.AUTO_ACKNOWLEDGE</code>,
195 * <code>Session.CLIENT_ACKNOWLEDGE</code> and
196 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
197 * @throws JMSException if the session cannot be created
198 */
199 public JmsSession(JmsConnection connection, boolean transacted,
200 int ackMode) throws JMSException {
201 if (connection == null) {
202 throw new IllegalArgumentException("Argument 'connection' is null");
203 }
204
205 _connection = connection;
206 _ackMode = (transacted) ? SESSION_TRANSACTED : ackMode;
207
208
209 _session = connection.getServerConnection().createSession(_ackMode,
210 transacted);
211
212
213 _session.setMessageListener(this);
214
215
216 if (!connection.isStopped()) {
217 start();
218 }
219 }
220
221 /***
222 * Creates a <code>BytesMessage</code> object. A <code>BytesMessage</code>
223 * object is used to send a message containing a stream of uninterpreted
224 * bytes.
225 *
226 * @throws JMSException if the JMS provider fails to create this message due
227 * to some internal error.
228 */
229 public BytesMessage createBytesMessage() throws JMSException {
230 ensureOpen();
231 return new BytesMessageImpl();
232 }
233
234 /***
235 * Creates a <code>MapMessage</code> object. A <code>MapMessage</code>
236 * object is used to send a self-defining set of name-value pairs, where
237 * names are <code>String</code> objects and values are primitive values in
238 * the Java programming language.
239 *
240 * @throws JMSException if the JMS provider fails to create this message due
241 * to some internal error.
242 */
243 public MapMessage createMapMessage() throws JMSException {
244 ensureOpen();
245 return new MapMessageImpl();
246 }
247
248 /***
249 * Creates a <code>Message</code> object. The <code>Message</code> interface
250 * is the root interface of all JMS messages. A <code>Message</code> object
251 * holds all the standard message header information. It can be sent when a
252 * message containing only header information is sufficient.
253 *
254 * @throws JMSException if the JMS provider fails to create this message due
255 * to some internal error.
256 */
257 public Message createMessage() throws JMSException {
258 ensureOpen();
259 return new MessageImpl();
260 }
261
262 /***
263 * Creates an <code>ObjectMessage</code> object. An <code>ObjectMessage</code>
264 * object is used to send a message that contains a serializable Java
265 * object.
266 *
267 * @throws JMSException if the JMS provider fails to create this message due
268 * to some internal error.
269 */
270 public ObjectMessage createObjectMessage() throws JMSException {
271 ensureOpen();
272 return new ObjectMessageImpl();
273 }
274
275 /***
276 * Creates an initialized <code>ObjectMessage</code> object. An
277 * <code>ObjectMessage</code> object is used to send a message that contains
278 * a serializable Java object.
279 *
280 * @param object the object to use to initialize this message
281 * @throws JMSException if the JMS provider fails to create this message due
282 * to some internal error.
283 */
284 public ObjectMessage createObjectMessage(Serializable object)
285 throws JMSException {
286 ensureOpen();
287 ObjectMessageImpl result = new ObjectMessageImpl();
288 result.setObject(object);
289 return result;
290 }
291
292 /***
293 * Creates a <code>StreamMessage</code> object. A <code>StreamMessage</code>
294 * object is used to send a self-defining stream of primitive values in the
295 * Java programming language.
296 *
297 * @throws JMSException if the JMS provider fails to create this message due
298 * to some internal error.
299 */
300 public StreamMessage createStreamMessage() throws JMSException {
301 ensureOpen();
302 return new StreamMessageImpl();
303 }
304
305 /***
306 * Creates a <code>TextMessage</code> object. A <code>TextMessage</code>
307 * object is used to send a message containing a <code>String</code>
308 * object.
309 *
310 * @throws JMSException if the JMS provider fails to create this message due
311 * to some internal error.
312 */
313 public TextMessage createTextMessage() throws JMSException {
314 ensureOpen();
315 return new TextMessageImpl();
316 }
317
318 /***
319 * Creates an initialized <code>TextMessage</code> object. A
320 * <code>TextMessage</code> object is used to send a message containing a
321 * <code>String</code>.
322 *
323 * @param text the string used to initialize this message
324 * @throws JMSException if the JMS provider fails to create this message due
325 * to some internal error.
326 */
327 public TextMessage createTextMessage(String text) throws JMSException {
328 ensureOpen();
329 TextMessageImpl result = new TextMessageImpl();
330 result.setText(text);
331 return result;
332 }
333
334 /***
335 * Determines if the session is transacted.
336 *
337 * @return <code>true</code> if the session is transacted
338 * @throws JMSException if the session is closed
339 */
340 public boolean getTransacted() throws JMSException {
341 ensureOpen();
342 return (_ackMode == SESSION_TRANSACTED);
343 }
344
345 /***
346 * Returns the acknowledgement mode of the session. The acknowledgement mode
347 * is set at the time that the session is created. If the session is
348 * transacted, the acknowledgement mode is ignored.
349 *
350 * @return If the session is not transacted, returns the current
351 * acknowledgement mode for the session. If the session is
352 * transacted, returns SESSION_TRANSACTED.
353 * @throws JMSException if the JMS provider fails to return the
354 * acknowledgment mode due to some internal error.
355 * @see Connection#createSession
356 */
357 public int getAcknowledgeMode() throws JMSException {
358 ensureOpen();
359 return _ackMode;
360 }
361
362 /***
363 * Creates a <code>MessageProducer</code> to send messages to the specified
364 * destination.
365 *
366 * @param destination the <code>Destination</code> to send to, or null if
367 * this is a producer which does not have a specified
368 * destination.
369 * @throws JMSException if the session fails to create a
370 * MessageProducer due to some internal
371 * error.
372 * @throws InvalidDestinationException if an invalid destination is
373 * specified.
374 */
375 public MessageProducer createProducer(Destination destination)
376 throws JMSException {
377 ensureOpen();
378 return new JmsMessageProducer(this, destination);
379 }
380
381 /***
382 * Creates a <code>MessageConsumer</code> for the specified destination.
383 *
384 * @param destination the <code>Destination</code> to access.
385 * @throws JMSException if the session fails to create a
386 * consumer due to some internal error.
387 * @throws InvalidDestinationException if an invalid destination is
388 * specified.
389 */
390
391 public MessageConsumer createConsumer(Destination destination)
392 throws JMSException {
393 return createConsumer(destination, null);
394 }
395
396 /***
397 * Creates a <code>MessageProducer</code> to receive messages from the
398 * specified destination, matching particular selection criteria
399 *
400 * @param destination the <code>Destination</code> to access
401 * @param messageSelector only messages with properties matching the message
402 * selector expression are delivered. A value of null
403 * or an empty string indicates that there is no
404 * message selector for the message consumer.
405 * @throws JMSException if the session fails to create a
406 * MessageConsumer due to some internal
407 * error.
408 * @throws InvalidDestinationException if an invalid destination is
409 * specified.
410 * @throws InvalidSelectorException if the message selector is invalid.
411 */
412 public MessageConsumer createConsumer(Destination destination,
413 String messageSelector)
414 throws JMSException {
415 return createConsumer(destination, messageSelector, false);
416 }
417
418 /***
419 * Creates a <code>MessageConsumer</code> to receive messages from the
420 * specified destination, matching particular selection criteria. This
421 * method can specify whether messages published by its own connection
422 * should be delivered to it, if the destination is a topic. <P>In some
423 * cases, a connection may both publish and subscribe to a topic. The
424 * consumer <code>noLocal</code> attribute allows a consumer to inhibit the
425 * delivery of messages published by its own connection. The default value
426 * for this attribute is false. The <code>noLocal</code> value must be
427 * supported by destinations that are topics.
428 *
429 * @param destination the <code>Destination</code> to access
430 * @param messageSelector only messages with properties matching the message
431 * selector expression are delivered. A value of null
432 * or an empty string indicates that there is no
433 * message selector for the message consumer.
434 * @param noLocal if true, and the destination is a topic, inhibits
435 * the delivery of messages published by its own
436 * connection. The behavior for <code>noLocal</code>
437 * is not specified if the destination is a queue.
438 * @throws JMSException if the session fails to create a
439 * MessageConsumer due to some internal
440 * error.
441 * @throws InvalidDestinationException if an invalid destination is
442 * specified.
443 * @throws InvalidSelectorException if the message selector is invalid.
444 */
445 public MessageConsumer createConsumer(Destination destination,
446 String messageSelector,
447 boolean noLocal) throws JMSException {
448 long consumerId = allocateConsumer(destination, messageSelector,
449 noLocal);
450 JmsMessageConsumer consumer = new JmsMessageConsumer(this, consumerId,
451 destination,
452 messageSelector);
453 addConsumer(consumer);
454 return consumer;
455 }
456
457 /***
458 * Creates a queue identity given a <code>Queue</code> name.
459 * <p/>
460 * <P>This facility is provided for the rare cases where clients need to
461 * dynamically manipulate queue identity. It allows the creation of a queue
462 * identity with a provider-specific name. Clients that depend on this
463 * ability are not portable.
464 * <p/>
465 * <P>Note that this method is not for creating the physical queue. The
466 * physical creation of queues is an administrative task and is not to be
467 * initiated by the JMS API. The one exception is the creation of temporary
468 * queues, which is accomplished with the <code>createTemporaryQueue</code>
469 * method.
470 *
471 * @param queueName the name of this <code>Queue</code>
472 * @return a <code>Queue</code> with the given name
473 * @throws JMSException if the session fails to create a queue due to some
474 * internal error.
475 */
476 public Queue createQueue(String queueName) throws JMSException {
477 ensureOpen();
478
479 JmsQueue queue;
480
481 if (queueName != null && queueName.length() > 0) {
482 queue = new JmsQueue(queueName);
483 } else {
484 throw new JMSException(
485 "Cannot create a queue with null or empty name");
486 }
487
488 return queue;
489 }
490
491 /***
492 * Creates a topic identity given a <code>Topic</code> name.
493 * <p/>
494 * <P>This facility is provided for the rare cases where clients need to
495 * dynamically manipulate topic identity. This allows the creation of a
496 * topic identity with a provider-specific name. Clients that depend on this
497 * ability are not portable.
498 * <p/>
499 * <P>Note that this method is not for creating the physical topic. The
500 * physical creation of topics is an administrative task and is not to be
501 * initiated by the JMS API. The one exception is the creation of temporary
502 * topics, which is accomplished with the <code>createTemporaryTopic</code>
503 * method.
504 *
505 * @param topicName the name of this <code>Topic</code>
506 * @return a <code>Topic</code> with the given name
507 * @throws JMSException if the session fails to create a topic due to some
508 * internal error.
509 */
510 public Topic createTopic(String topicName) throws JMSException {
511 ensureOpen();
512
513 JmsTopic topic;
514
515 if (topicName != null && topicName.length() > 0) {
516 topic = new JmsTopic(topicName);
517 } else {
518 throw new JMSException("Invalid or null topic name specified");
519 }
520
521 return topic;
522 }
523
524 /***
525 * Creates a durable subscriber to the specified topic.
526 * <p/>
527 * <P>If a client needs to receive all the messages published on a topic,
528 * including the ones published while the subscriber is inactive, it uses a
529 * durable <code>TopicSubscriber</code>. The JMS provider retains a record
530 * of this durable subscription and insures that all messages from the
531 * topic's publishers are retained until they are acknowledged by this
532 * durable subscriber or they have expired.
533 * <p/>
534 * <P>Sessions with durable subscribers must always provide the same client
535 * identifier. In addition, each client must specify a name that uniquely
536 * identifies (within client identifier) each durable subscription it
537 * creates. Only one session at a time can have a <code>TopicSubscriber</code>
538 * for a particular durable subscription.
539 * <p/>
540 * <P>A client can change an existing durable subscription by creating a
541 * durable <code>TopicSubscriber</code> with the same name and a new topic
542 * and/or message selector. Changing a durable subscriber is equivalent to
543 * unsubscribing (deleting) the old one and creating a new one.
544 * <p/>
545 * <P>In some cases, a connection may both publish and subscribe to a topic.
546 * The subscriber <code>noLocal</code> attribute allows a subscriber to
547 * inhibit the delivery of messages published by its own connection. The
548 * default value for this attribute is false.
549 *
550 * @param topic the non-temporary <code>Topic</code> to subscribe to
551 * @param name the name used to identify this subscription
552 * @throws JMSException if the session fails to create a
553 * subscriber due to some internal
554 * error.
555 * @throws InvalidDestinationException if an invalid topic is specified.
556 */
557 public TopicSubscriber createDurableSubscriber(Topic topic, String name)
558 throws JMSException {
559 return createDurableSubscriber(topic, name, null, false);
560 }
561
562 /***
563 * Creates a durable subscriber to the specified topic, using a message
564 * selector and specifying whether messages published by its own connection
565 * should be delivered to it.
566 * <p/>
567 * <P>If a client needs to receive all the messages published on a topic,
568 * including the ones published while the subscriber is inactive, it uses a
569 * durable <code>TopicSubscriber</code>. The JMS provider retains a record
570 * of this durable subscription and insures that all messages from the
571 * topic's publishers are retained until they are acknowledged by this
572 * durable subscriber or they have expired.
573 * <p/>
574 * <P>Sessions with durable subscribers must always provide the same client
575 * identifier. In addition, each client must specify a name which uniquely
576 * identifies (within client identifier) each durable subscription it
577 * creates. Only one session at a time can have a <code>TopicSubscriber</code>
578 * for a particular durable subscription. An inactive durable subscriber is
579 * one that exists but does not currently have a message consumer associated
580 * with it.
581 * <p/>
582 * <P>A client can change an existing durable subscription by creating a
583 * durable <code>TopicSubscriber</code> with the same name and a new topic
584 * and/or message selector. Changing a durable subscriber is equivalent to
585 * unsubscribing (deleting) the old one and creating a new one.
586 *
587 * @param topic the non-temporary <code>Topic</code> to subscribe
588 * to
589 * @param name the name used to identify this subscription
590 * @param messageSelector only messages with properties matching the message
591 * selector expression are delivered. A value of
592 * null or an empty string indicates that there is no
593 * message selector for the message consumer.
594 * @param noLocal if set, inhibits the delivery of messages
595 * published by its own connection
596 * @throws JMSException if the session fails to create a
597 * subscriber due to some internal
598 * error.
599 * @throws InvalidDestinationException if an invalid topic is specified.
600 * @throws InvalidSelectorException if the message selector is invalid.
601 */
602 public TopicSubscriber createDurableSubscriber(Topic topic, String name,
603 String messageSelector,
604 boolean noLocal)
605 throws JMSException {
606 ensureOpen();
607
608 if (topic == null) {
609 throw new InvalidDestinationException("Cannot create durable subscriber: argument 'topic' is "
610 + " null");
611 }
612 if (name == null || name.trim().length() == 0) {
613 throw new JMSException("Invalid subscription name specified");
614 }
615
616
617
618 if (((JmsTopic) topic).isTemporaryDestination()) {
619 throw new InvalidDestinationException(
620 "Cannot create a durable subscriber for a temporary topic");
621 }
622
623 long consumerId = _session.createDurableConsumer((JmsTopic) topic, name,
624 messageSelector,
625 noLocal);
626 JmsTopicSubscriber subscriber = new JmsTopicSubscriber(this,
627 consumerId,
628 topic,
629 messageSelector,
630 noLocal);
631 addConsumer(subscriber);
632
633 return subscriber;
634 }
635
636 /***
637 * Creates a <code>QueueBrowser</code> object to peek at the messages on the
638 * specified queue.
639 *
640 * @param queue the queue to access
641 * @throws JMSException if the session fails to create a
642 * browser due to some internal error.
643 * @throws InvalidDestinationException if an invalid destination is
644 * specified
645 */
646 public QueueBrowser createBrowser(Queue queue) throws JMSException {
647 return createBrowser(queue, null);
648 }
649
650 /***
651 * Creates a <code>QueueBrowser</code> object to peek at the messages on the
652 * specified queue using a message selector.
653 *
654 * @param queue the <code>queue</code> to access
655 * @param messageSelector only messages with properties matching the message
656 * selector expression are delivered. A value of null
657 * or an empty string indicates that there is no
658 * message selector for the message consumer.
659 * @throws JMSException if the session fails to create a
660 * browser due to some internal error.
661 * @throws InvalidDestinationException if an invalid destination is
662 * specified
663 * @throws InvalidSelectorException if the message selector is invalid.
664 */
665 public QueueBrowser createBrowser(Queue queue, String messageSelector)
666 throws JMSException {
667 ensureOpen();
668 if (!(queue instanceof JmsQueue)) {
669 throw new InvalidDestinationException("Cannot create QueueBrowser for destination="
670 + queue);
671 }
672
673 JmsQueue dest = (JmsQueue) queue;
674
675
676 if (!checkForValidTemporaryDestination(dest)) {
677 throw new InvalidDestinationException(
678 "Cannot create a queue browser for a temporary queue "
679 + "that is not bound to this connection");
680 }
681
682 long consumerId = _session.createBrowser(dest, messageSelector);
683 JmsQueueBrowser browser = new JmsQueueBrowser(this, consumerId, queue,
684 messageSelector);
685 addConsumer(browser);
686 return browser;
687 }
688
689 /***
690 * Creates a <code>TemporaryQueue</code> object. Its lifetime will be that
691 * of the <code>Connection</code> unless it is deleted earlier.
692 *
693 * @return a temporary queue identity
694 * @throws JMSException if the session fails to create a temporary queue due
695 * to some internal error.
696 */
697 public TemporaryQueue createTemporaryQueue() throws JMSException {
698 ensureOpen();
699 return JmsTemporaryQueue.create(getConnection());
700 }
701
702 /***
703 * Creates a <code>TemporaryTopic</code> object. Its lifetime will be that
704 * of the <code>Connection</code> unless it is deleted earlier.
705 *
706 * @return a temporary topic identity
707 * @throws JMSException if the session fails to create a temporary topic due
708 * to some internal error.
709 */
710 public TemporaryTopic createTemporaryTopic() throws JMSException {
711 ensureOpen();
712 return JmsTemporaryTopic.create(getConnection());
713 }
714
715 /***
716 * Unsubscribes a durable subscription that has been created by a client.
717 * <p/>
718 * <P>This method deletes the state being maintained on behalf of the
719 * subscriber by its provider.
720 * <p/>
721 * <P>It is erroneous for a client to delete a durable subscription while
722 * there is an active <code>MessageConsumer</code> or
723 * <code>TopicSubscriber</code> for the subscription, or while a consumed
724 * message is part of a pending transaction or has not been acknowledged in
725 * the session.
726 *
727 * @param name the name used to identify this subscription
728 * @throws JMSException if the session fails to unsubscribe
729 * to the durable subscription due to
730 * some internal error.
731 * @throws InvalidDestinationException if an invalid subscription name is
732 * specified.
733 */
734 public void unsubscribe(String name) throws JMSException {
735 ensureOpen();
736 _session.unsubscribe(name);
737 }
738
739 /***
740 * Commit all messages done in this transaction
741 *
742 * @throws JMSException if the transaction cannot be committed
743 */
744 public void commit() throws JMSException {
745 ensureOpen();
746 ensureTransactional();
747
748
749 getServerSession().send(_messagesToSend);
750 _messagesToSend.clear();
751
752
753 getServerSession().commit();
754 }
755
756 /***
757 * Rollback any messages done in this transaction
758 *
759 * @throws JMSException if the transaction cannot be rolled back
760 */
761 public void rollback() throws JMSException {
762 ensureOpen();
763 ensureTransactional();
764
765
766 _messagesToSend.clear();
767
768
769 getServerSession().rollback();
770 }
771
772 /***
773 * Close the session. This call will block until a receive or message
774 * listener in progress has completed. A blocked message consumer receive
775 * call returns <code>null</code> when this session is closed.
776 *
777 * @throws JMSException if the session can't be closed
778 */
779 public void close() throws JMSException {
780 boolean closing;
781 synchronized (_closeLock) {
782 closing = _closing;
783 _closing = true;
784 }
785 if (!closing) {
786
787 stop();
788
789
790 synchronized (_receiveLock) {
791 _receiveLock.notifyAll();
792 }
793
794 _closed = true;
795
796
797 JmsMessageProducer[] producers =
798 (JmsMessageProducer[]) _producers.toArray(
799 new JmsMessageProducer[0]);
800 for (int i = 0; i < producers.length; ++i) {
801 JmsMessageProducer producer = producers[i];
802 producer.close();
803 }
804
805
806 JmsMessageConsumer[] consumers =
807 (JmsMessageConsumer[]) _consumers.values().toArray(
808 new JmsMessageConsumer[0]);
809 for (int i = 0; i < consumers.length; ++i) {
810 JmsMessageConsumer consumer = consumers[i];
811 consumer.close();
812 }
813
814
815 _connection.removeSession(this);
816 _connection = null;
817
818
819 _messagesToSend.clear();
820
821
822
823 getServerSession().close();
824 _session = null;
825 }
826 }
827
828 /***
829 * Stop message delivery in this session, and restart sending messages with
830 * the oldest unacknowledged message
831 *
832 * @throws JMSException if the session can't be recovered
833 */
834 public void recover() throws JMSException {
835 ensureOpen();
836 if (getTransacted()) {
837 throw new IllegalStateException(
838 "Cannot recover from a transacted session");
839 }
840
841 getServerSession().recover();
842 }
843
844 /***
845 * Returns the message listener associated with the session
846 *
847 * @return the message listener associated with the session, or
848 * <code>null</code> if no listener is registered
849 * @throws JMSException if the session is closed
850 */
851 public MessageListener getMessageListener() throws JMSException {
852 ensureOpen();
853 return _listener;
854 }
855
856 /***
857 * Sets the session's message listener.
858 *
859 * @param listener the session's message listener
860 * @throws JMSException if the session is closed
861 */
862 public void setMessageListener(MessageListener listener)
863 throws JMSException {
864 ensureOpen();
865 _listener = listener;
866 }
867
868 /***
869 * Iterates through the list of messages added by an {@link
870 * JmsConnectionConsumer}, sending them to the registered listener
871 */
872 public void run() {
873 try {
874 while (!_messageCache.isEmpty()) {
875 Message message = (Message) _messageCache.remove(0);
876 _listener.onMessage(message);
877 }
878 } catch (Exception exception) {
879 _log.error("Error in the Session.run()", exception);
880 } finally {
881
882 _messageCache.clear();
883 }
884 }
885
886 /***
887 * Set the message listener for a particular consumer.
888 * <p/>
889 * If a listener is already registered for the consumer, it will be
890 * automatically overwritten
891 *
892 * @param listener the message listener
893 * @throws JMSException if the listener can't be set
894 */
895 public void setMessageListener(JmsMessageConsumer listener)
896 throws JMSException {
897 ensureOpen();
898 setAsynchronous(listener.getConsumerId(), true);
899 }
900
901 /***
902 * Remove a message listener
903 *
904 * @param listener the message listener to remove
905 * @throws JMSException if the listener can't be removed
906 */
907 public void removeMessageListener(JmsMessageConsumer listener)
908 throws JMSException {
909 ensureOpen();
910 setAsynchronous(listener.getConsumerId(), false);
911 }
912
913 /***
914 * This will start message delivery to this session. If message delivery has
915 * already started, or the session is currently being closed then this is a
916 * no-op.
917 *
918 * @throws JMSException if message delivery can't be started
919 */
920 public void start() throws JMSException {
921 ensureOpen();
922 synchronized (_closeLock) {
923 if (_stopped && !_closing) {
924 getServerSession().start();
925 _stopped = false;
926 }
927 }
928 }
929
930 /***
931 * This will stop message delivery to this session. If message delivery has
932 * already stoped then this is a no-op.
933 *
934 * @throws JMSException if message delivery can't be stopped
935 */
936 public void stop() throws JMSException {
937 ensureOpen();
938 synchronized (_closeLock) {
939 if (!_stopped) {
940 getServerSession().stop();
941 _stopped = true;
942 }
943 }
944 }
945
946 /***
947 * Acknowledge the specified message. This is only applicable for
948 * CLIENT_ACKNOWLEDGE sessions. For other session types, the request is
949 * ignored.
950 * <p/>
951 * Acking a message automatically acks all those that have come before it.
952 *
953 * @param message the message to acknowledge
954 * @throws JMSException if the message can't be acknowledged
955 */
956 public void acknowledgeMessage(Message message) throws JMSException {
957 ensureOpen();
958 if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
959 MessageImpl impl = (MessageImpl) message;
960 getServerSession().acknowledgeMessage(impl.getConsumerId(),
961 impl.getAckMessageID());
962 }
963 }
964
965 /***
966 * Enable or disable asynchronous message delivery for the specified
967 * consumer.
968 *
969 * @param consumerId the consumer identifier
970 * @param enable <code>true</code> to enable; <code>false</code> to
971 * disable
972 * @throws JMSException if message delivery cannot be enabled or disabled
973 */
974 public void setAsynchronous(long consumerId, boolean enable)
975 throws JMSException {
976 ensureOpen();
977 getServerSession().setAsynchronous(consumerId, enable);
978 }
979
980 /***
981 * Deliver a message.
982 *
983 * @param message the message to deliver
984 * @return <code>true</code> if the message was delivered; otherwise
985 * <code>false</code>.
986 */
987 public boolean onMessage(MessageImpl message) {
988 boolean delivered = false;
989 message.setJMSXRcvTimestamp(System.currentTimeMillis());
990
991 long consumerId = message.getConsumerId();
992 JmsMessageConsumer consumer
993 = (JmsMessageConsumer) _consumers.get(new Long(consumerId));
994
995 message.setSession(this);
996 if (consumer != null) {
997
998
999
1000
1001 if (_listener != null) {
1002 try {
1003 _listener.onMessage(message);
1004 delivered = true;
1005 } catch (Throwable exception) {
1006 _log.error("MessageListener threw exception", exception);
1007 }
1008 } else {
1009 delivered = consumer.onMessage(message);
1010 }
1011 } else {
1012 _log.error("Received a message for an inactive consumer");
1013 }
1014 return delivered;
1015 }
1016
1017 /***
1018 * Inform the session that there is a message available for a synchronous
1019 * consumer.
1020 */
1021 public void onMessageAvailable() {
1022
1023 notifyConsumer();
1024 }
1025
1026 /***
1027 * Receive the next message that arrives within the specified timeout
1028 * interval. This call blocks until a message arrives, the timeout expires,
1029 * or this message consumer is closed. A timeout of <code>0</code> never
1030 * expires and the call blocks indefinitely.
1031 *
1032 * @param consumerId the consumer identifier
1033 * @param timeout the timeout interval, in milliseconds
1034 * @return the next message produced for the consumer, or <code>null</code>
1035 * if the timeout expires or the consumer concurrently closed
1036 * @throws JMSException if the next message can't be received
1037 */
1038 public MessageImpl receive(long consumerId, long timeout)
1039 throws JMSException {
1040 MessageImpl message = null;
1041 ensureOpen();
1042
1043 synchronized (_receiveLock) {
1044 if (_blockingConsumer != -1) {
1045 throw new IllegalStateException(
1046 "Session cannot be accessed concurrently");
1047 }
1048
1049 _blockingConsumer = consumerId;
1050
1051 long start = (timeout != 0) ? System.currentTimeMillis() : 0;
1052 try {
1053 while (message == null && !isClosed()) {
1054 if (timeout == 0) {
1055 message = getServerSession().receive(consumerId, 0);
1056 } else {
1057 message = getServerSession().receive(consumerId,
1058 timeout);
1059 }
1060 if (message == null && !isClosed()) {
1061
1062
1063
1064 try {
1065 if (timeout == 0) {
1066 _receiveLock.wait();
1067 } else {
1068 long elapsed = System.currentTimeMillis()
1069 - start;
1070 if (elapsed >= timeout) {
1071
1072 break;
1073 } else {
1074
1075
1076
1077 timeout -= elapsed;
1078 }
1079 _receiveLock.wait(timeout);
1080 }
1081 } catch (InterruptedException ignore) {
1082
1083 }
1084 }
1085 }
1086
1087 if (message != null) {
1088 message.setSession(this);
1089 if (_ackMode == AUTO_ACKNOWLEDGE
1090 || _ackMode == DUPS_OK_ACKNOWLEDGE) {
1091 getServerSession().acknowledgeMessage(
1092 message.getConsumerId(),
1093 message.getMessageId().toString());
1094 }
1095 }
1096 } finally {
1097 _blockingConsumer = -1;
1098 }
1099 }
1100 return message;
1101 }
1102
1103 /***
1104 * Receive the next message if one is immediately available.
1105 *
1106 * @param consumerId the consumer identifier
1107 * @return the next message produced for this consumer, or <code>null</code>
1108 * if one is not available
1109 * @throws JMSException if the next message can't be received
1110 */
1111 public MessageImpl receiveNoWait(long consumerId) throws JMSException {
1112 ensureOpen();
1113 MessageImpl message = getServerSession().receiveNoWait(consumerId);
1114 if (message != null) {
1115 message.setSession(this);
1116 if (_ackMode == AUTO_ACKNOWLEDGE
1117 || _ackMode == DUPS_OK_ACKNOWLEDGE) {
1118 getServerSession().acknowledgeMessage(
1119 message.getConsumerId(),
1120 message.getMessageId().toString());
1121 }
1122 }
1123 return message;
1124 }
1125
1126 /***
1127 * Browse up to count messages.
1128 *
1129 * @param consumerId the consumer identifier
1130 * @param count the maximum number of messages to receive
1131 * @return a list of {@link MessageImpl} instances
1132 * @throws JMSException for any JMS error
1133 */
1134 public List browse(long consumerId, int count)
1135 throws JMSException {
1136 ensureOpen();
1137 return getServerSession().browse(consumerId, count);
1138 }
1139
1140 /***
1141 * Send the specified message to the server.
1142 *
1143 * @param message the message to send
1144 * @throws JMSException if the message can't be sent
1145 */
1146 protected void sendMessage(Message message) throws JMSException {
1147 if (getTransacted()) {
1148
1149
1150 if (message instanceof MessageImpl) {
1151 try {
1152 message = (Message) ((MessageImpl) message).clone();
1153 } catch (CloneNotSupportedException error) {
1154 throw new JMSException(error.getMessage());
1155 }
1156 } else {
1157 message = convert(message);
1158 }
1159 _messagesToSend.add(message);
1160 } else {
1161 if (!(message instanceof MessageImpl)) {
1162 message = convert(message);
1163 }
1164 getServerSession().send((MessageImpl) message);
1165 }
1166 }
1167
1168 /***
1169 * Returns the server session.
1170 *
1171 * @return the server session
1172 */
1173 protected ServerSession getServerSession() {
1174 return _session;
1175 }
1176
1177 /***
1178 * Return a reference to the connection that created this session.
1179 *
1180 * @return the owning connection
1181 */
1182 protected JmsConnection getConnection() {
1183 return _connection;
1184 }
1185
1186 /***
1187 * Creates a new message consumer, returning its identity.
1188 *
1189 * @param destination the destination to access
1190 * @param selector the message selector. May be <code>null</code>
1191 * @param noLocal if true, and the destination is a topic, inhibits the
1192 * delivery of messages published by its own connection.
1193 * The behavior for <code>noLocal</code> is not specified
1194 * if the destination is a queue.
1195 * @throws JMSException if the session fails to create a
1196 * MessageConsumer due to some internal
1197 * error.
1198 * @throws InvalidDestinationException if an invalid destination is
1199 * specified.
1200 * @throws InvalidSelectorException if the message selector is invalid.
1201 */
1202 protected long allocateConsumer(Destination destination,
1203 String selector, boolean noLocal)
1204 throws JMSException {
1205 ensureOpen();
1206
1207 if (!(destination instanceof JmsDestination)) {
1208 throw new InvalidDestinationException("Cannot create MessageConsumer for destination="
1209 + destination);
1210 }
1211 JmsDestination dest = (JmsDestination) destination;
1212
1213
1214
1215 if (!checkForValidTemporaryDestination(dest)) {
1216 throw new InvalidDestinationException(
1217 "Trying to create a MessageConsumer for a temporary "
1218 + "destination that is not bound to this connection");
1219 }
1220
1221 return _session.createConsumer(dest, selector, noLocal);
1222 }
1223
1224 /***
1225 * This method checks the destination. If the destination is not temporary
1226 * then return true. If it is a temporary destination and it is owned by
1227 * this session's connection then it returns true. If it is a tmeporary
1228 * destination and it is owned by another connection then it returns false
1229 *
1230 * @param destination the destination to check
1231 * @return <code>true</code> if the destination is valid
1232 */
1233 protected boolean checkForValidTemporaryDestination(
1234 JmsDestination destination) {
1235 boolean result = false;
1236
1237 if (destination.isTemporaryDestination()) {
1238 JmsTemporaryDestination temp =
1239 (JmsTemporaryDestination) destination;
1240
1241
1242
1243 if (temp.validForConnection(getConnection())) {
1244 result = true;
1245 }
1246 } else {
1247 result = true;
1248 }
1249
1250 return result;
1251 }
1252
1253 /***
1254 * Add a consumer to the list of consumers managed by this session.
1255 *
1256 * @param consumer the consumer to add
1257 */
1258 protected void addConsumer(JmsMessageConsumer consumer) {
1259 _consumers.put(new Long(consumer.getConsumerId()), consumer);
1260 }
1261
1262 /***
1263 * Remove a consumer, deregistering it on the server.
1264 *
1265 * @param consumer the consumer to remove
1266 * @throws JMSException if removal fails
1267 */
1268 protected void removeConsumer(JmsMessageConsumer consumer)
1269 throws JMSException {
1270 long consumerId = consumer.getConsumerId();
1271 try {
1272 _session.closeConsumer(consumerId);
1273 } finally {
1274 _consumers.remove(new Long(consumerId));
1275 }
1276 }
1277
1278 /***
1279 * Add a producer to the list of producers managed by this session.
1280 *
1281 * @param producer the producer to add
1282 */
1283 protected void addProducer(JmsMessageProducer producer) {
1284 _producers.add(producer);
1285 }
1286
1287 /***
1288 * Remove the producer from the list of managed producers.
1289 *
1290 * @param producer the producer to remove
1291 */
1292 protected void removeProducer(JmsMessageProducer producer) {
1293 _producers.remove(producer);
1294 }
1295
1296 /***
1297 * Check if the session is closed.
1298 *
1299 * @return <code>true</code> if the session is closed
1300 */
1301 protected final boolean isClosed() {
1302 return _closed;
1303 }
1304
1305 /***
1306 * Add a message to the message cache. This message will be processed when
1307 * the run() method is called.
1308 *
1309 * @param message the message to add.
1310 */
1311 protected void addMessage(Message message) {
1312 _messageCache.add(message);
1313 }
1314
1315 /***
1316 * Verifies that the session isn't closed.
1317 *
1318 * @throws IllegalStateException if the session is closed
1319 */
1320 protected void ensureOpen() throws IllegalStateException {
1321 if (isClosed()) {
1322 throw new IllegalStateException(
1323 "Cannot perform operation - session has been closed");
1324 }
1325 }
1326
1327 /***
1328 * Verifies that the session is transactional.
1329 *
1330 * @throws IllegalStateException if the session isn't transactional
1331 */
1332 private void ensureTransactional() throws IllegalStateException {
1333 if (_ackMode != SESSION_TRANSACTED) {
1334 throw new IllegalStateException(
1335 "Cannot perform operatiorn - session is not transactional");
1336 }
1337 }
1338
1339 /***
1340 * Notifies any blocking synchronous consumer.
1341 */
1342 private void notifyConsumer() {
1343 synchronized (_receiveLock) {
1344 _receiveLock.notifyAll();
1345 }
1346 }
1347
1348 /***
1349 * Convert a message to its corresponding OpenJMS implementation.
1350 *
1351 * @param message the message to convert
1352 * @return the OpenJMS implementation of the message
1353 * @throws JMSException for any error
1354 */
1355 private Message convert(Message message) throws JMSException {
1356 MessageConverter converter =
1357 MessageConverterFactory.create(message);
1358 return converter.convert(message);
1359 }
1360
1361 }
1362